package com.demo.join;

import com.demo.bean.OrderLogEvent1;
import com.demo.bean.OrderLogEvent2;
import com.demo.bean.OrderLogEvent3;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

public class TumblingWindowJoin {
    /**
     * 获取时间
     *
     * @return
     */
    public static long getTime(String timeStr) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
        try {
            return sdf.parse(timeStr).getTime();
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return -1L;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String event1Csv = TumblingWindowJoin.class.getResource("/OrderLogEvent1.csv").getPath();

        KeyedStream<OrderLogEvent1, Long> leftOrderStream = env.readTextFile(event1Csv)
                .map(new MapFunction<String, OrderLogEvent1>() {
                    @Override
                    public OrderLogEvent1 map(String line) throws Exception {
                        String[] lines = line.split(",");
                        return new OrderLogEvent1(Long.valueOf(lines[0]), Double.valueOf(lines[1]), getTime(lines[2]));
                    }
                }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderLogEvent1>(Time.seconds(5)) {
                    @Override
                    public long extractTimestamp(OrderLogEvent1 element) {
                        return element.getTimeStamp();
                    }
                }).keyBy(OrderLogEvent1::getOrderId);
        leftOrderStream.print("leftOrderStream");

        String event2Csv = TumblingWindowJoin.class.getResource("/OrderLogEvent2.csv").getPath();
        KeyedStream<OrderLogEvent2, Long> rightOrderStream = env.readTextFile(event2Csv).map(new MapFunction<String, OrderLogEvent2>() {
            @Override
            public OrderLogEvent2 map(String line) throws Exception {
                String[] lines = line.split(",");
                return new OrderLogEvent2(Long.valueOf(lines[0]), Long.valueOf(lines[1]), getTime(lines[2]));
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderLogEvent2>(Time.seconds(5)) {
            @Override
            public long extractTimestamp(OrderLogEvent2 element) {
                return element.getTimeStamp();
            }
        }).keyBy(OrderLogEvent2::getOrderId);
        rightOrderStream.print("rightOrderStream");

        //数据准备如下：
        //1,22.1	2,22.2	4,22.3	4,22.4	5,22.5	6,22.6
        //13:01	    13:03	13:04	13:05	13:07	13:09
        //1,121	    2,122	3,123	4,124	5,125	7,126

        //Tumbling Window Join
        //Tumbling Window 将数据依据固定的窗口长度对数据进行切片，窗口的长度固定，event没有重叠。
        //https://blog.csdn.net/qq_37142346/article/details/90600020

        //时间窗口起始时间。https://blog.csdn.net/lp284558195/article/details/114391637
        leftOrderStream.join(rightOrderStream)
                .where(OrderLogEvent1::getOrderId)
                .equalTo(OrderLogEvent2::getOrderId)
                // 5min的时间滚动窗口 这里的例子不是很好。测试数据单位是分钟。orderId=4的数据取到amount=22.3，但是此时的滚动时间窗口是没有right的数据的。所以输出0
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<OrderLogEvent1, OrderLogEvent2, OrderLogEvent3>() {
                    @Override
                    public OrderLogEvent3 join(OrderLogEvent1 orderLogEvent1, OrderLogEvent2 orderLogEvent2) throws Exception {
                        return new OrderLogEvent3(orderLogEvent1.getOrderId(), orderLogEvent1.getAmount(), orderLogEvent2.getItemId());
                    }
                }).print("tumblingWindowJoinDataStream--JoinFunction");

        //
        leftOrderStream.coGroup(rightOrderStream)
                .where(OrderLogEvent1::getOrderId)
                .equalTo(OrderLogEvent2::getOrderId)
                // 5min的时间滚动窗口 这里的例子不是很好。测试数据单位是分钟。orderId=4的数据取到amount=22.3，但是此时的滚动时间窗口是没有right的数据的。所以输出null
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction<OrderLogEvent1, OrderLogEvent2, OrderLogEvent3>() {
                    @Override
                    public void coGroup(Iterable<OrderLogEvent1> logEvent1Iterable, Iterable<OrderLogEvent2> logEvent2Iterable, Collector<OrderLogEvent3> collector) throws Exception {
                        //实现leftjoin
                        //迭代1种的数据，判断是否存在2 ，存在则输出。不存在则输出null
                        /*Map<Long, List<OrderLogEvent2>> logEvent2Map = new HashMap<>();
                        logEvent2Iterable.forEach(logEvent2 -> {
                            if (logEvent2Map.containsKey(logEvent2.getOrderId())) {
                                List<OrderLogEvent2> vals = logEvent2Map.get(logEvent2.getOrderId());
                                if (vals == null) {
                                    vals = new ArrayList<>();
                                }
                                vals.add(logEvent2);
                                logEvent2Map.put(logEvent2.getOrderId(), vals);
                            }
                        });*/
                        logEvent1Iterable.forEach(logEvent1 -> {
                            boolean exist = false;
                            Iterator<OrderLogEvent2> logEvent2It = logEvent2Iterable.iterator();
                            OrderLogEvent2 logEvent2 = null;
                            while (!exist && logEvent2It.hasNext()) {
                                logEvent2 = logEvent2It.next();
                                exist = logEvent2.getOrderId() == logEvent1.getOrderId();
                            }
                            collector.collect(new OrderLogEvent3(logEvent1.getOrderId(), logEvent1.getAmount(), logEvent2 != null ? logEvent2.getItemId() : null));

                            //将上面的迭代判断改为取容器的集合数据
                            /*if (logEvent2Map.containsKey(logEvent1.getOrderId())) {
                                for (OrderLogEvent2 logEvent2 : logEvent2Map.get(logEvent1.getOrderId())) {
                                    collector.collect(new OrderLogEvent3(logEvent1.getOrderId(), logEvent1.getAmount(), logEvent2.getItemId()));
                                }
                            } else {
                                collector.collect(new OrderLogEvent3(logEvent1.getOrderId(), logEvent1.getAmount(), null));
                            }*/
                        });
                    }
                }).print("CoGroupFunction-leftJoin");

        env.execute("TumblingWindowJoin");
    }
}
